package com.sophie.chapter2;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class KafkaProducerAnalysis {

    private static final String brokerList = "localhost:9092";

    private static final String topic = "heima";

    public static Properties initConfig() {
        Properties properties = new Properties();
        // 设置KEY序列化器
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 设置重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 10);
        // 设置值序列化器
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 设置集群地址
        properties.put("bootstrap.servers", brokerList);
        return properties;
    }

    public static Properties initNewConfig() {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.RETRIES_CONFIG, 10);
        // 用来设定生产者对应的客户端ID，默认为空，如果不设置自动生成一个非空字符串；
        // 内容形式如： "producer-1"
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo");

        // 自定义分区器
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefinePartitioner.class.getName());

        // 自定义拦截器
        properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptorPrefix.class.getName());


        // ack消息应答， 字符串类型，0,1
        properties.put(ProducerConfig.ACKS_CONFIG, 0);
        return properties;
    }

    public static Properties initPerferConfig() {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return properties;
    }

    public static void main(String[] args) {
        Properties properties = initNewConfig();
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

        // KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties,
        //        new StringSerializer(), new StringSerializer());
        // 生成 对象，并制定 topic， key 以及 value
        // kafka是线程安全
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Kafka-demo-001", "hello, Kafka!");
        try {
            // 1、发送消息
            producer.send(record);

            // 2、同步发送消息
            // 通过send（）发送完消息后返回一个Future对象，然后调用Future对象的get()方法等待kafka响应
            // 如果kafka正常响应，返回一个RecordMetadata对象,该对象存储消息的偏移量
            // 如果kafka发送错误，无法正常响应，就会抛出异常，我们便可以进行异常处理
            // producer.send(record).get();

            // 3、异步发送
            // producer.send(record, new Callback() {
            //     @Override
            //     public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            //         if (e == null) {
            //             System.out.println(recordMetadata.partition() + ":" + recordMetadata.offset());
            //         }
            //     }
            // });

        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}
